// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package leafcert

import (
	"context"
	"fmt"
	"time"

	"github.com/hashicorp/consul/agent/cacheshim"
	"github.com/hashicorp/consul/lib"
)

// Notify registers a desire to be updated about changes to a cache result.
//
// It is a helper that abstracts code from performing their own "blocking" query
// logic against a cache key to watch for changes and to maintain the key in
// cache actively. It will continue to perform blocking Get requests until the
// context is canceled.
//
// The passed context must be canceled or timeout in order to free resources
// and stop maintaining the value in cache. Typically request-scoped resources
// do this but if a long-lived context like context.Background is used, then the
// caller must arrange for it to be canceled when the watch is no longer
// needed.
//
// The passed chan may be buffered or unbuffered, if the caller doesn't consume
// fast enough it will block the notification loop. When the chan is later
// drained, watching resumes correctly. If the pause is longer than the
// cachetype's TTL, the result might be removed from the local cache. Even in
// this case though when the chan is drained again, the new Get will re-fetch
// the entry from servers and resume notification behavior transparently.
//
// The chan is passed in to allow multiple cached results to be watched by a
// single consumer without juggling extra goroutines per watch. The
// correlationID is opaque and will be returned in all UpdateEvents generated by
// result of watching the specified request so the caller can set this to any
// value that allows them to disambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate.
func (m *Manager) Notify(
	ctx context.Context,
	req *ConnectCALeafRequest,
	correlationID string,
	ch chan<- cacheshim.UpdateEvent,
) error {
	return m.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cacheshim.UpdateEvent) {
		select {
		case ch <- event:
		case <-ctx.Done():
		}
	})
}

// NotifyCallback allows you to receive notifications about changes to a cache
// result in the same way as Notify, but accepts a callback function instead of
// a channel.
func (m *Manager) NotifyCallback(
	ctx context.Context,
	req *ConnectCALeafRequest,
	correlationID string,
	cb cacheshim.Callback,
) error {
	if req.Key() == "" {
		return fmt.Errorf("a key is required")
	}
	// Lightweight copy this object so that manipulating req doesn't race.
	dup := *req
	req = &dup

	if req.MaxQueryTime <= 0 {
		req.MaxQueryTime = DefaultQueryTimeout
	}

	go m.notifyBlockingQuery(ctx, req, correlationID, cb)
	return nil
}

func (m *Manager) notifyBlockingQuery(
	ctx context.Context,
	req *ConnectCALeafRequest,
	correlationID string,
	cb cacheshim.Callback,
) {
	// Always start at 0 index to deliver the initial (possibly currently cached
	// value).
	index := uint64(0)
	failures := uint(0)

	for {
		// Check context hasn't been canceled
		if ctx.Err() != nil {
			return
		}

		// Blocking request
		req.MinQueryIndex = index
		newValue, meta, err := m.internalGet(ctx, req)

		// Check context hasn't been canceled
		if ctx.Err() != nil {
			return
		}

		// Check the index of the value returned in the cache entry to be sure it
		// changed
		if index == 0 || index < meta.Index {
			cb(ctx, cacheshim.UpdateEvent{
				CorrelationID: correlationID,
				Result:        newValue,
				Meta:          meta,
				Err:           err,
			})

			// Update index for next request
			index = meta.Index
		}

		var wait time.Duration
		// Handle errors with backoff. Badly behaved blocking calls that returned
		// a zero index are considered as failures since we need to not get stuck
		// in a busy loop.
		if err == nil && meta.Index > 0 {
			failures = 0
		} else {
			failures++
			wait = backOffWait(m.config, failures)

			m.logger.
				With("error", err).
				With("index", index).
				Warn("handling error in Manager.Notify")
		}

		if wait > 0 {
			select {
			case <-time.After(wait):
			case <-ctx.Done():
				return
			}
		}
		// Sanity check we always request blocking on second pass
		if err == nil && index < 1 {
			index = 1
		}
	}
}

func backOffWait(cfg Config, failures uint) time.Duration {
	if failures > cfg.LeafCertRefreshBackoffMin {
		shift := failures - cfg.LeafCertRefreshBackoffMin
		waitTime := cfg.LeafCertRefreshMaxWait
		if shift < 31 {
			waitTime = (1 << shift) * time.Second
		}
		if waitTime > cfg.LeafCertRefreshMaxWait {
			waitTime = cfg.LeafCertRefreshMaxWait
		}
		return waitTime + lib.RandomStagger(waitTime)
	}
	return 0
}
